---
title: prefect-dbt
---

With `prefect-dbt`, you can trigger and observe dbt Cloud jobs, execute dbt Core CLI commands, and incorporate other tools, such as [Snowflake](/integrations/prefect-snowflake/index), into your dbt runs.
Prefect provides a global view of the state of your workflows and allows you to take action based on state changes.

Prefect integrations may provide pre-built [blocks](/v3/develop/blocks), [flows](/v3/develop/write-flows), or [tasks](/v3/develop/write-tasks) for interacting with external systems.
Block types in this library allow you to do things such as run a dbt Cloud job or execute a dbt Core command.

## Getting started

### Prerequisites

- A [dbt Cloud account](https://cloud.getdbt.com/) if using dbt Cloud.

### Install `prefect-dbt`

The following command will install a version of `prefect-dbt` compatible with your installed version of `prefect`. 
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

```bash
pip install "prefect[dbt]"
```

Upgrade to the latest versions of `prefect` and `prefect-dbt`:

```bash
pip install -U "prefect[dbt]"
```

If necessary, see [additional installation options for dbt Core with BigQuery, Snowflake, and Postgres](#additional-installation-options).


### Register newly installed blocks types

Register the block types in the `prefect-dbt` module to make them available for use.

```bash
prefect block register -m prefect_dbt
```

## dbt Cloud

If you have an existing dbt Cloud job, use the pre-built flow `run_dbt_cloud_job` to trigger a job run and wait until the job run is finished. If some nodes fail, `run_dbt_cloud_job` can efficiently retry the unsuccessful nodes. Prior to running this flow, save your dbt Cloud credentials to a DbtCloudCredentials block and create a dbt Cloud Job block:


### Save dbt Cloud credentials to a block

Blocks can be [created through code](/v3/develop/blocks) or through the UI.

To create a dbt Cloud Credentials block:

1. Log into your [dbt Cloud account](https://cloud.getdbt.com/settings/profile).
2. Click **API Tokens** on the sidebar.
3. Copy a Service Token.
4. Copy the account ID from the URL: `https://cloud.getdbt.com/settings/accounts/<ACCOUNT_ID>`.
5. Create and run the following script, replacing the placeholders:

```python
from prefect_dbt.cloud import DbtCloudCredentials


DbtCloudCredentials(
    api_key="API-KEY-PLACEHOLDER",
    account_id="ACCOUNT-ID-PLACEHOLDER"
).save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
```

### Create a dbt Cloud job block

1. In dbt Cloud, click on **Deploy** -> **Jobs**.
2. Select a job.
3. Copy the job ID from the URL: `https://cloud.getdbt.com/deploy/<ACCOUNT_ID>/projects/<PROJECT_ID>/jobs/<JOB_ID>`
4. Create and run the following script, replacing the placeholders.

```python
from prefect_dbt.cloud import DbtCloudCredentials, DbtCloudJob


dbt_cloud_credentials = DbtCloudCredentials.load("CREDENTIALS-BLOCK-PLACEHOLDER")
dbt_cloud_job = DbtCloudJob(
    dbt_cloud_credentials=dbt_cloud_credentials,
    job_id="JOB-ID-PLACEHOLDER"
).save("JOB-BLOCK-NAME-PLACEHOLDER")
```
### Run a dbt Cloud job and wait for completion

```python
from prefect import flow
from prefect_dbt.cloud import DbtCloudJob
from prefect_dbt.cloud.jobs import run_dbt_cloud_job
import asyncio

@flow
async def run_dbt_job_flow():
    result = await run_dbt_cloud_job(
        dbt_cloud_job = await DbtCloudJob.load("JOB-BLOCK-NAME-PLACEHOLDER"),
        targeted_retries = 0,
    )
    return await result

if __name__ == "__main__":
    asyncio.run(run_dbt_job_flow())
```

## dbt Core

### prefect-dbt 0.7.0 and later

Versions 0.7.0 and later of `prefect-dbt` include the `PrefectDbtRunner` class, which provides an improved interface for running dbt Core commands with better logging, failure handling, and automatic asset lineage.

<Tip>
The `PrefectDbtRunner` is inspired by the `DbtRunner` from dbt Core, and its `invoke` method accepts the same arguments.
Refer to the [`DbtRunner` documentation](https://docs.getdbt.com/reference/programmatic-invocations) for more information on how to call `invoke`.
</Tip>

Basic usage:

```python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner().invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

When calling `.invoke()` in a flow or task, each node in dbt's execution graph is reflected as a task in Prefect's execution graph.
Logs from each node will belong to the corresponding task, and each task's state is determined by the state of that node's execution.

```bash
15:54:59.119 | INFO    | Flow run 'imposing-partridge' - Found 8 models, 3 seeds, 18 data tests, 543 macros
15:54:59.134 | INFO    | Flow run 'imposing-partridge' - 
15:54:59.148 | INFO    | Flow run 'imposing-partridge' - Concurrency: 1 threads (target='dev')
15:54:59.164 | INFO    | Flow run 'imposing-partridge' - 
15:54:59.665 | INFO    | Task run 'model my_first_dbt_model' - 1 of 29 OK created sql table model main.my_first_dbt_model ..................... [OK in 0.18s]
15:54:59.671 | INFO    | Task run 'model my_first_dbt_model' - Finished in state Completed()
...
15:55:02.373 | ERROR   | Task run 'model product_metrics' -   Runtime Error in model product_metrics (models/marts/product/product_metrics.sql)
  Binder Error: Values list "o" does not have a column named "product_id"
  
  LINE 47:         on p.product_id = o.product_id
15:55:02.857 | ERROR   | Task run 'model product_metrics' - Finished in state Failed('Task run encountered an exception Exception: Node model.demo.product_metrics finished with status error')
```

<Warning>
The task runs created by calling `.invoke()` run separately from dbt Core, and do not affect dbt's execution behavior.
These tasks do not persist results and cannot be cached.

Use [dbt's native retry functionality](https://docs.getdbt.com/reference/commands/retry) in combination with [runtime data from `prefect`](/v3/how-to-guides/workflows/access-runtime-info) to retry failed nodes.

```python
from prefect import flow
from prefect.runtime.flow_run import get_run_count
from prefect_dbt import PrefectDbtRunner


@flow(retries=2)
def run_dbt():
    runner = PrefectDbtRunner()

    if get_run_count() == 1:
        runner.invoke(["build"])
    else:
        runner.invoke(["retry"])


if __name__ == "__main__":
    run_dbt()
```
</Warning>

#### Assets

Prefect Cloud maintains a graph of [assets](/v3/concepts/assets), objects produced by your workflows.

Any dbt seed, source or model will appear on your asset graph in Prefect Cloud once it has been executed using the `PrefectDbtRunner`.
The upstream dependencies of an asset materialized by `prefect-dbt` are derived from the `depends_on` field in dbt's `manifest.json`.

The asset's `key` will be its corresponding dbt resource's `relation_name`.

The `name` and `description` asset properties are populated by a dbt resource's name description.

The `owners` asset property is populated if there is data assigned to the `owner` key under a resoure's `meta` config.

```yaml
models:
  - name: product_metrics
    description: "Product metrics and categorization"
    config:
      meta:
        owner: "kevin-g"
```

Asset metadata is collected from the result of the node's execution.

```json
{
  "node_path": "marts/product/product_metrics.sql",
  "node_name": "product_metrics",
  "unique_id": "model.demo.product_metrics",
  "resource_type": "model",
  "materialized": "table",
  "node_status": "error",
  "node_started_at": "2025-06-26T20:55:05.661126",
  "node_finished_at": "2025-06-26T20:55:05.733257",
  "meta": {
    "owner": "kevin-g"
  },
  "node_relation": {
    "database": "dev",
    "schema": "main_marts",
    "alias": "product_metrics",
    "relation_name": "\"dev\".\"main_marts\".\"product_metrics\""
  }
}
```

Optionally, the compiled code of a dbt model can be appended to the asset description.

```python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(include_compiled_code=True).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```


#### dbt settings

The `PrefectDbtSettings` class, based on Pydantic's `BaseSettings` class, automatically detects `DBT_`-prefixed environment variables that have a direct effect on the `PrefectDbtRunner` class.
If no environment variables are set, dbt's defaults are used.

Provide a `PrefectDbtSettings` instance to `PrefectDbtRunner` to customize dbt settings or override environment variables.

```python
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt"
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

#### Logging

The `PrefectDbtRunner` class maps all dbt log levels to standard Python logging levels, so filtering for log levels like `WARNING` or `ERROR` in the Prefect UI applies to dbt's logs.

By default, the logging level used by dbt is Prefect's logging level, which can be configured using the `PREFECT_LOGGING_LEVEL` Prefect setting.

The dbt logging level can be set independently from Prefect's by using the `DBT_LOG_LEVEL` environment variable, setting `log_level` in `PrefectDbtSettings`, or passing the `--log-level` flag or `log_level` kwarg to `.invoke()`.
Only logging levels of higher severity (more restrictive) than Prefect's logging level will have an effect.

```python
from dbt_common.events.base_types import EventLevel
from prefect import flow
from prefect_dbt import PrefectDbtRunner, PrefectDbtSettings


@flow
def run_dbt():
    PrefectDbtRunner(
        settings=PrefectDbtSettings(
            project_dir="test",
            profiles_dir="examples/run_dbt",
            log_level=EventLevel.ERROR, # explicitly choose a higher log level for dbt
        )
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```

#### `profiles.yml` templating

The `PrefectDbtRunner` class supports templating in your `profiles.yml` file, allowing you to reference Prefect blocks and variables that will be resolved at runtime.
This enables you to store sensitive credentials securely using Prefect blocks, and configure different targets based on the Prefect workspace.

For example, a Prefect variable called `target` can have a different value in development (`dev`) and production (`prod`) workspaces.
This allows you to use the same `profiles.yml` file to automatically reference a local DuckDB instance in development and a Snowflake instance in production.

```yaml
example:
  outputs:
    dev:
      type: duckdb
      path: dev.duckdb
      threads: 1

    prod:
      type: snowflake
      account: "{{ prefect.blocks.snowflake-credentials.warehouse-access.account }}"
      user: "{{ prefect.blocks.snowflake-credentials.warehouse-access.user }}"
      password: "{{ prefect.blocks.snowflake-credentials.warehouse-access.password }}"
      database: "{{ prefect.blocks.snowflake-connector.prod-connector.database }}"
      schema: "{{ prefect.blocks.snowflake-connector.prod-connector.schema }}"
      warehouse: "{{ prefect.blocks.snowflake-connector.prod-connector.warehouse }}"
      threads: 4

  target: "{{ prefect.variables.target }}"
```

#### Failure handling

By default, any dbt node execution failures cause the entire dbt run to raise an exception with a message containing detailed information about the failure.

```
Failures detected during invocation of dbt command 'build':
Test not_null_my_first_dbt_model_id failed with message: "Got 1 result, configured to fail if != 0"
```

The `PrefectDbtRunner`'s `raise_on_failure` option can be set to `False` to prevent failures in dbt from causing the failure of the flow or task in which `.invoke()` is called.

```python
from prefect import flow
from prefect_dbt import PrefectDbtRunner


@flow
def run_dbt():
    PrefectDbtRunner(
        raise_on_failure=False  # Failed tests will not fail the flow run
    ).invoke(["build"])


if __name__ == "__main__":
    run_dbt()
```


#### Native dbt configuration

You can disable automatic asset lineage detection for all resources in your dbt project config, or for specific resources in their own config:

```yaml
prefect:
  enable_assets: False
```

### prefect-dbt 0.6.6 and earlier

`prefect-dbt` supports a couple of ways to run dbt Core commands.
A `DbtCoreOperation` block will run the commands as shell commands, while other tasks use dbt's [Programmatic Invocation](#programmatic-invocation).

Optionally, specify the `project_dir`.
If `profiles_dir` is not set, the `DBT_PROFILES_DIR` environment variable will be used.
If `DBT_PROFILES_DIR` is not set, the default directory will be used `$HOME/.dbt/`.

#### Use an existing profile

If you have an existing dbt `profiles.yml` file, specify the `profiles_dir` where the file is located:

```python
from prefect import flow
from prefect_dbt.cli.commands import DbtCoreOperation


@flow
def trigger_dbt_flow() -> str:
    result = DbtCoreOperation(
        commands=["pwd", "dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER"
    ).run()
    return result


if __name__ == "__main__":
    trigger_dbt_flow()
```


If you are already using Prefect blocks such as the [Snowflake Connector block](integrations/prefect-snowflake), you can use those blocks to [create a new `profiles.yml` with a `DbtCliProfile` block](#create-a-new-profile-with-blocks). 


##### Use environment variables with Prefect secret blocks

If you use environment variables in `profiles.yml`, set a Prefect Secret block as an environment variable:

```python
import os
from prefect.blocks.system import Secret


secret_block = Secret.load("DBT_PASSWORD_PLACEHOLDER")

# Access the stored secret
DBT_PASSWORD = secret_block.get()
os.environ["DBT_PASSWORD"] = DBT_PASSWORD
```

This example `profiles.yml` file could then access that variable.

```yaml
profile:
  target: prod
  outputs:
    prod:
      type: postgres
      host: 127.0.0.1
      # IMPORTANT: Make sure to quote the entire Jinja string here
      user: dbt_user
      password: "{{ env_var('DBT_PASSWORD') }}"
```


#### Create a new `profiles.yml` file with blocks

If you don't have a `profiles.yml` file, you can use a DbtCliProfile block to create `profiles.yml`.
Then, specify `profiles_dir` where `profiles.yml` will be written.
Here's example code with placeholders:

```python
from prefect import flow
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation


@flow
def trigger_dbt_flow():
    dbt_cli_profile = DbtCliProfile.load("DBT-CORE-OPERATION-BLOCK-PLACEHOLDER")
    with DbtCoreOperation(
        commands=["dbt debug", "dbt run"],
        project_dir="PROJECT-DIRECTORY-PLACEHOLDER",
        profiles_dir="PROFILES-DIRECTORY-PLACEHOLDER",
        dbt_cli_profile=dbt_cli_profile,
    ) as dbt_operation:
        dbt_process = dbt_operation.trigger()
        # do other things before waiting for completion
        dbt_process.wait_for_completion()
        result = dbt_process.fetch_result()
    return result


if __name__ == "__main__":
    trigger_dbt_flow()
```

<Warning> 
**Supplying the `dbt_cli_profile` argument will overwrite existing `profiles.yml` files**

If you already have a `profiles.yml` file in the specified `profiles_dir`, the file will be overwritten. If you do not specify a profiles directory, `profiles.yml` at `~/.dbt/` would be overwritten.
</Warning> 


Visit the SDK reference in the side navigation to see other built-in `TargetConfigs` blocks.

If the desired service profile is not available, you can build one from the generic `TargetConfigs` class.

#### Programmatic Invocation

`prefect-dbt` has some pre-built tasks that use dbt's [programmatic invocation](https://docs.getdbt.com/reference/programmatic-invocations).  

For example:

```python
from prefect import flow
from prefect_dbt.cli.tasks import from prefect import flow
from prefect_dbt.cli.commands import trigger_dbt_cli_command, dbt_build_task


@flow
def dbt_build_flow():
    trigger_dbt_cli_command(
        command="dbt deps", project_dir="/Users/test/my_dbt_project_dir",
    )
    dbt_build_task(
        project_dir = "/Users/test/my_dbt_project_dir",
        create_summary_artifact = True,
        summary_artifact_key = "dbt-build-task-summary",
        extra_command_args=["--select", "foo_model"]
    )


if __name__ == "__main__":
    dbt_build_flow()
```

See the [SDK docs](https://reference.prefect.io/prefect_dbt/) for other pre-built tasks.

##### Create a summary artifact

These pre-built tasks can also create artifacts. These artifacts have extra information about dbt Core runs, such as messages and compiled code for nodes that fail or have errors.

![prefect-dbt Summary Artifact](/images/prefect-dbt-summary-artifact.png)

#### BigQuery CLI profile block example
To create dbt Core target config and profile blocks for BigQuery:

1. Save and load a `GcpCredentials` block.
2. Determine the schema / dataset you want to use in BigQuery.
3. Create a short script, replacing the placeholders.

```python
from prefect_gcp.credentials import GcpCredentials
from prefect_dbt.cli import BigQueryTargetConfigs, DbtCliProfile


credentials = GcpCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
target_configs = BigQueryTargetConfigs(
    schema="SCHEMA-NAME-PLACEHOLDER",  # also known as dataset
    credentials=credentials,
)
target_configs.save("TARGET-CONFIGS-BLOCK-NAME-PLACEHOLDER")

dbt_cli_profile = DbtCliProfile(
    name="PROFILE-NAME-PLACEHOLDER",
    target="TARGET-NAME-placeholder",
    target_configs=target_configs,
)
dbt_cli_profile.save("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
```

To create a dbt Core operation block:

1. Determine the dbt commands you want to run.
2. Create a short script, replacing the placeholders.

```python
from prefect_dbt.cli import DbtCliProfile, DbtCoreOperation


dbt_cli_profile = DbtCliProfile.load("DBT-CLI-PROFILE-BLOCK-NAME-PLACEHOLDER")
dbt_core_operation = DbtCoreOperation(
    commands=["DBT-CLI-COMMANDS-PLACEHOLDER"],
    dbt_cli_profile=dbt_cli_profile,
    overwrite_profiles=True,
)
dbt_core_operation.save("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")
```

Load the saved block that holds your credentials:

```python
from prefect_dbt.cloud import DbtCoreOperation


DbtCoreOperation.load("DBT-CORE-OPERATION-BLOCK-NAME-PLACEHOLDER")
```

## Resources

For assistance using dbt, consult the [dbt documentation](https://docs.getdbt.com/docs/building-a-dbt-project/documentation).

Refer to the `prefect-dbt` [SDK documentation](https://reference.prefect.io/prefect_dbt/) to explore all the capabilities of the `prefect-dbt` library.

### Additional installation options

Additional installation options for dbt Core with BigQuery, Snowflake, and Postgres are shown below.

#### Additional capabilities for dbt Core and Snowflake profiles

First install the main library compatible with your Prefect version:

```bash
pip install "prefect[dbt]"
```

Then install the additional capabilities you need.
```bash
pip install "prefect-dbt[snowflake]"
```

#### Additional capabilities for dbt Core and BigQuery profiles

```bash
pip install "prefect-dbt[bigquery]"
```

#### Additional capabilities for dbt Core and Postgres profiles

```bash
pip install "prefect-dbt[postgres]"
```

Or, install all of the extras.

```bash
pip install -U "prefect-dbt[all_extras]"
```